import { Tabs, Callout } from "nextra/components";
import UniversalTabs from "../../../../components/UniversalTabs";

# Worker Affinity Assignment (Beta)

<Callout type="info">
  This feature is currently in beta and may be subject to change.
</Callout>

It is often desirable to assign workflows to specific workers based on certain criteria, such as worker capabilities, resource availability, or location. Worker affinity allows you to specify that a workflow should be assigned to a specific worker based on worker label state. Labels can be set dynamically on workers to reflect their current state, such as a specific model loaded into memory or specific disk requirements.

Specific steps can then specify desired label state to ensure that workflows are assigned to workers that meet specific criteria. If no worker meets the specified criteria, the step run will remain in a pending state until a suitable worker becomes available or the step is cancelled. (See [Scheduling Timeouts](../timeouts.mdx))

## Specifying Worker Labels

Labels can be set on workers when they are registered with the Hatchet service. Labels are key-value pairs that can be used to specify worker capabilities, resource availability, or other criteria that can be used to match workflows to workers. Values can be strings or numbers, and multiple labels can be set on a worker.

<UniversalTabs items={['Python', 'Typescript', 'Go']}>
  <Tabs.Tab>

```python
worker = hatchet.worker(
    "affinity-worker",
    labels={
        "model": "fancy-ai-model-v2",
        "memory": 512,
    },
)
```

  </Tabs.Tab>
  <Tabs.Tab>

```typescript
const worker = await hatchet.worker("affinity-worker", {
  labels: {
    model: "fancy-ai-model-v2",
    memory: 512,
  },
});
```

  </Tabs.Tab>
  <Tabs.Tab>

```go
	w, err := worker.NewWorker(
		worker.WithClient(
			c,
		),
		worker.WithLabels(map[string]interface{}{
			"model":  "fancy-ai-model-v2",
			"memory": 512,
		}),
	)
```

  </Tabs.Tab>
</UniversalTabs>

## Specifying Step Desired Labels

You can specify desired worker label state for specific steps in a workflow by setting the `worker_labels` property on the step definition. This property is an object where the keys are the label keys and the values are objects with the following properties:

- `value`: The desired value of the label
- `comparator` (default: `EQUAL`): The comparison operator to use when matching the label value.
  - `EQUAL`: The label value must be equal to the desired value
  - `NOT_EQUAL`: The label value must not be equal to the desired value
  - `GREATER_THAN`: The label value must be greater than the desired value
  - `GREATER_THAN_OR_EQUAL`: The label value must be greater than or equal to the desired value
  - `LESS_THAN`: The label value must be less than the desired value
  - `LESS_THAN_OR_EQUAL`: The label value must be less than or equal to the desired value
- `required` (default: `true`): Whether the label is required for the step to run. If `true`, the step will remain in a pending state until a worker with the desired label state becomes available. If `false`, the worker will be prioritized based on the sum of the highest matching weights.
- `weight` (optional, default: `100`): The weight of the label. Higher weights are prioritized over lower weights when selecting a worker for the step. If multiple workers have the same highest weight, the worker with the highest sum of weights will be selected. Ignored if `required` is `true`.

<UniversalTabs items={['Python', 'Typescript', 'Go']}>
  <Tabs.Tab>
```python
@hatchet.step(
    desired_worker_labels={
        "model": {
            "value": "fancy-ai-model-v2",
            "weight": 10
        },
        "memory": {
            "value": 256,
            "required": True,
            "comparator": WorkerLabelComparator.GREATER_THAN,
        }
    },
)
async def step(self, context: Context):
    return {"worker": context.worker.id()}
```
  </Tabs.Tab>
  <Tabs.Tab>
```typescript
const affinity: Workflow = {
  id: 'affinity-workflow',
  description: 'test',
  steps: [
    {
      name: 'child-step1',
      worker_labels: {
        model: {
          value: 'fancy-vision-model',
          required: false,
        },
        memory: {
          value: 1024,
          comparator: WorkerLabelComparator.GREATER_THAN_OR_EQUAL,
        },
      },
      run: async (ctx) => {
        // DO WORK
        return { childStep1: 'childStep1 results!' };
      },
    },
  ],
};
```
  </Tabs.Tab>
  <Tabs.Tab>
```go
	err = w.RegisterWorkflow(
		&worker.WorkflowJob{
			On:          worker.Events("user:create:affinity"),
			Name:        "affinity",
			Description: "affinity",
			Steps: []*worker.WorkflowStep{
				worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {
					return &stepOneOutput{
						Message: ctx.Worker().ID(),
					}, nil
				}).
					SetName("step-one").
					SetDesiredLabels(map[string]*types.DesiredWorkerLabel{
						"model": {
							Value:  "fancy-ai-model-v2",
							Weight: 10,
						},
						"memory": {
							Value:      512,
							Required:   true,
							Comparator: types.ComparatorPtr(types.WorkerLabelComparator_GREATER_THAN),
						},
					}),
			},
		},
	)
```
  </Tabs.Tab>
</UniversalTabs>

<Callout type="warning">
  Use extra care when using worker affinity with [sticky assignment `HARD`
  strategy](./sticky-assignment.mdx). In this case, it is recommended to set
  desired labels on the first step of the workflow to ensure that the workflow
  is assigned to a worker that meets the desired criteria and remains on that
  worker for the duration of the workflow.
</Callout>

### Dynamic Worker Labels

Labels can also be set dynamically on workers using the `upsertLabels` method. This can be useful when worker state changes over time, such as when a new model is loaded into memory or when a worker's resource availability changes.

<UniversalTabs items={['Python', 'Typescript', 'Go']}>
  <Tabs.Tab>

```python
    @hatchet.step(
        desired_worker_labels={
            "model": {
              "value": "fancy-vision-model",
              "weight": 10
            },
            "memory": {
                "value": 256,
                "required": True,
                "comparator": WorkerLabelComparator.GREATER_THAN,
            },
        },
    )
    async def step(self, context: Context):
        if context.worker.get_labels().get("model") != "fancy-vision-model":
            context.worker.upsert_labels({"model": "unset"})
            # DO WORK TO EVICT OLD MODEL / LOAD NEW MODEL
            evictModel()
            loadNewModel("fancy-vision-model")
            context.worker.upsert_labels({"model": "fancy-vision-model"})

        return {"worker": context.worker.id()}

```

  </Tabs.Tab>
  <Tabs.Tab>

```typescript
const affinity: Workflow = {
  id: "dynamic-affinity-workflow",
  description: "test",
  steps: [
    {
      name: "child-step1",
      worker_labels: {
        model: {
          value: "fancy-vision-model",
          required: false,
        },
      },
      run: async (ctx) => {
        if (ctx.worker.labels().model !== "fancy-vision-model") {
          await ctx.worker.upsertLabels({ model: undefined });
          await evictModel();
          await loadNewModel("fancy-vision-model");
          await ctx.worker.upsertLabels({ model: "fancy-vision-model" });
        }
        // DO WORK
        return { childStep1: "childStep1 results!" };
      },
    },
  ],
};
```

  </Tabs.Tab>
  <Tabs.Tab>

```go
	err = w.RegisterWorkflow(
		&worker.WorkflowJob{
			On:          worker.Events("user:create:affinity"),
			Name:        "affinity",
			Description: "affinity",
			Steps: []*worker.WorkflowStep{
				worker.Fn(func(ctx worker.HatchetContext) (result *stepOneOutput, err error) {

    				model := ctx.Worker().GetLabels()["model"]

    				if model != "fancy-vision-model" {
    					ctx.Worker().UpsertLabels(map[string]interface{}{
    						"model": nil,
    					})
    					// Do something to load the model
            evictModel();
            loadNewModel("fancy-vision-model");
    					ctx.Worker().UpsertLabels(map[string]interface{}{
    						"model": "fancy-vision-model",
    					})
    				}

    				return &stepOneOutput{
    					Message: ctx.Worker().ID(),
    				}, nil
    			}).
    				SetName("step-one").
    				SetDesiredLabels(map[string]*types.DesiredWorkerLabel{
    					"model": {
    						Value:  "fancy-vision-model",
    						Weight: 10,
    					},
    					"memory": {
    						Value:      512,
    						Required:   true,
    						Comparator: types.WorkerLabelComparator_GREATER_THAN,
    					},
    				}),
    		},
    	},
    )

```

  </Tabs.Tab>
</UniversalTabs>
